[AWS IoT Greenglass] YOLOv5(物体検出モデル)を ONNX Runtimeで使用し、室内の人数をリアルタイムで確認できるカスタムコンポーネントを作ってみました

[AWS IoT Greenglass] YOLOv5(物体検出モデル)を ONNX Runtimeで使用し、室内の人数をリアルタイムで確認できるカスタムコンポーネントを作ってみました

Clock Icon2023.07.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部 delivery部の平内(SIN)です。

一般的に、エッジ側で機械学習の推論を行うことは、レイテンシーやスケーラビリティでメリットがあります。

今回は、その一例として、YOLOv5(物体検出モデル)をONNXフォーマットにエクスポートして、AWS IoT Greengras 上の ONNX Runtime で推論してみました。

イメージしたのは、オフィスや、店舗に設置されたカメラで人物を検出し、リアルタイムで何人いるのかをクラウドに送信するソリューソンです。

2 構成

構成は、以下の通りです。
❶ YOLOv5のモデルは、ONNXフォーマットにエクスポートして、コンポーネントの一部としてS3へ置かれます
❷ モデルを含んだコンポーネントは、AWS IoT Greenglassでエッジ側にデプロイされます
❸ エッジデバイスでは、カメラの画像を推論し、人物を検出します
❹ 検出した人数は、リアルタイムでIoT Coreに送信されます

点線で囲われた部分は、想定となっています。 エッジ側のカメラ部分は、予め用意した画像を順次使用する事とし、また、データの閲覧に関しては、IoT Coreのメッセージブローカーへのデータ到着を確認するまでとなっています。

3 RaspberryPi

エッジデバイスとして使用したのは、RaspberryPi 4B(4G) で、OSは、今年5月の最新版(2023-05-03) Raspberry Pi OS (64-bit) A port Debian Bullseye with the Respbeyy Pi Desktop (Compatible with Raspberry Pi 3/4/400)です。

$ cat /proc/cpuinfo  | grep Revision
Revision    : c03112

$ lsb_release -a
No LSB modules are available.
Distributor ID: Debian
Description:    Debian GNU/Linux 11 (bullseye)
Release:    11
Codename:   bullseye

$ uname -a
Linux raspberrypi 6.1.21-v8+ #1642 SMP PREEMPT Mon Apr  3 17:24:16 BST 2023 aarch64 GNU/Linux

$ getconf LONG_BIT
64

後述するレシピでは、必要なライブラリをpipでインストールしていますが、32ビットOSだと、pipだけでインストールできないものもあるので注意が必要です。

4 YOLOv5

YOLOv5のリポジトリで、簡単にONNXフォーマットのモデルが作成(エクスポート)できます。

$ git clone https://github.com/ultralytics/yolov5
$ cd yolov5
$ pip install -r requirements.txt

$ python3 export.py --weights yolov5s.pt --include onnx

% ls -la *.onnx
-rw-r--r--  1 user  staff  29352400  6  4 07:07 yolov5s.onnx

参考:https://docs.ultralytics.com/yolov5/tutorials/model_export/

5 Component

作成したコンポーネント(com.example.OnnxSample)は、次のような構成になっています。

index.pyが、エッジ上で動作するロジックで、機械学習のモデル(yolov5s.onnx)は、model の下に置かれています。images の中に置かれた画像は、カメラの入力を模擬するために使用されています。

% tree .
.
├── images
│   ├── image_001.png
│   ├── image_002.png
│   ├── image_003.png
│   ├── image_004.png
│   └── image_005.png
├── index.py
└── model
    └── yolov5s.onnx

この構成を、zipで固めて、S3バケットに送信しています。

% zip -r onnx-sample.zip .
% aws s3 cp onnx-sample.zip s3://gg-artifacts-2023-01-04/onnx-sample.zip
% rm onnx-sample.zip

6 recipes

レシピです。上で用意したzipファイルを Artifacts: で使用しています。

必要なライブラリ(awsiotsdk opencv-python onnxruntime torch torchvision onnx)は、 Lifecycle:Install で配置されます。MQTTでの送信は、aws.greengrass.ipc.mqttproxy で権限付与されています。

---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.OnnxSample"
ComponentVersion: "1.1.0"
ComponentType: "aws.greengrass.generic"
ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.mqttproxy:
        com.demo.greengrass-onnx:mqttproxy:1:
          policyDescription: "Publish inference results to topic"
          operations:
          - "aws.greengrass#PublishToIoTCore"
          resources:
          - "*"
Manifests:
- Platform:
    os: "linux"
  Name: "Linux"
  Lifecycle:
    Install: "pip install awsiotsdk opencv-python onnxruntime torch torchvision onnx"
    Run: "python3 -u {artifacts:decompressedPath}/onnx-sample/index.py"
  Artifacts:
  - Uri: "s3://gg-artifacts-2023-01-04/onnx-sample.zip"
    Unarchive: "ZIP"
Lifecycle: {}

7 カメラ入力(模擬)

カメラ入力を模擬するための画像は、以下の5枚です。

それぞれの画像をyolov5s.onnxで推論すると、いくつかのオブジェクト(人物、イス、コンピュータ、プランターなど)が検出されますが、その中で Personと検出されたものだけをピックアップするようにしました。

それぞれの画像で、検出されるPersonは、以下のようになっています。

images_001.png: 8人
images_002.png: 2人
images_003.png: 0人
images_004.png: 1人
images_005.png: 6人

8 動作確認

コンポーネントの動作が始まると、5秒に1回、メッセージブローカーにデータが到着します。 予め確認したとおりの「人数」が検出され、送信されていることが確認できました。

9 コード

最後に、コンポーネントで動作しているコードです。

YOLOv5のリポジトリで提供されている、detect.pyを参考にさせていただいております。

分類モデルと違って、推論後の属性抽出がやや複雑になってしまいますが、YOLOv5を使う場合の共通コードとして利用可能だと思います。

コードには、検出したオブジェクトの位置(バウンディングボックス)も処理されていますが、今回のように、人物のカウントだけが要件であった場合は、削ってしまっても問題ありません。

index.py

import os
import time
import json
import math
import cv2
import numpy as np
import torch
import torchvision
import onnxruntime

import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import QOS, PublishToIoTCoreRequest

topic = "demo/onnx"
qos = QOS.AT_LEAST_ONCE
ipc_client = awsiot.greengrasscoreipc.connect()
scriptPath = os.path.abspath(os.path.dirname(__file__))
modelPath = scriptPath + "/model/yolov5s.onnx"
imagesPath = scriptPath + "/images"


def xywh2xyxy(x):
    y = x.clone()
    y[..., 0] = x[..., 0] - x[..., 2] / 2  # top left x
    y[..., 1] = x[..., 1] - x[..., 3] / 2  # top left y
    y[..., 2] = x[..., 0] + x[..., 2] / 2  # bottom right x
    y[..., 3] = x[..., 1] + x[..., 3] / 2  # bottom right y
    return y


def non_max_suppression(
    prediction,
    conf_thres=0.25,
    iou_thres=0.45,
    agnostic=False,
    labels=(),
    max_det=300,
    nm=0,
):
    bs = prediction.shape[0]  # batch size
    nc = prediction.shape[2] - nm - 5  # number of classes
    xc = prediction[..., 4] > conf_thres  # candidates

    max_wh = 7680  # (pixels) maximum box width and height
    max_nms = 30000  # maximum number of boxes into torchvision.ops.nms()
    time_limit = 0.5 + 0.05 * bs  # seconds to quit after

    t = time.time()
    mi = 5 + nc
    output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
    for xi, x in enumerate(prediction):  # image index, image inference
        x = x[xc[xi]]  # confidence
        if labels and len(labels[xi]):
            lb = labels[xi]
            v = torch.zeros((len(lb), nc + nm + 5), device=x.device)
            v[:, :4] = lb[:, 1:5]  # box
            v[:, 4] = 1.0  # conf
            v[range(len(lb)), lb[:, 0].long() + 5] = 1.0  # cls
            x = torch.cat((x, v), 0)

        if not x.shape[0]:
            continue
        x[:, 5:] *= x[:, 4:5]
        box = xywh2xyxy(x[:, :4])
        mask = x[:, mi:]

        conf, j = x[:, 5:mi].max(1, keepdim=True)
        x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]

        n = x.shape[0]
        if not n:
            continue
        x = x[x[:, 4].argsort(descending=True)[:max_nms]]

        c = x[:, 5:6] * (0 if agnostic else max_wh)
        boxes, scores = x[:, :4] + c, x[:, 4]
        i = torchvision.ops.nms(boxes, scores, iou_thres)
        i = i[:max_det]

        output[xi] = x[i]
        if (time.time() - t) > time_limit:
            break
    return output


def letterbox(
    im,
    new_shape=(640, 640),
    color=(114, 114, 114),
    scaleup=True,
    stride=32,
):
    shape = im.shape[:2]  # current shape [height, width]

    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    if not scaleup:  # only scale down, do not scale up (for better val mAP)
        r = min(r, 1.0)

    ratio = r, r  # width, height ratios
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh padding

    dw /= 2
    dh /= 2

    if shape[::-1] != new_unpad:  # resize
        im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    im = cv2.copyMakeBorder(
        im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color
    )
    return im, ratio, (dw, dh)


def clip_boxes(boxes, shape):
    # Clip boxes (xyxy) to image shape (height, width)
    boxes[..., 0].clamp_(0, shape[1])  # x1
    boxes[..., 1].clamp_(0, shape[0])  # y1
    boxes[..., 2].clamp_(0, shape[1])  # x2
    boxes[..., 3].clamp_(0, shape[0])  # y2


def scale_boxes(img1_shape, boxes, img0_shape):
    gain = min(
        img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]
    )  # gain  = old / new
    pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (
        img1_shape[0] - img0_shape[0] * gain
    ) / 2  # wh padding

    boxes[..., [0, 2]] -= pad[0]  # x padding
    boxes[..., [1, 3]] -= pad[1]  # y padding
    boxes[..., :4] /= gain
    clip_boxes(boxes, img0_shape)
    return boxes


def inference(session, input_file):
    counter = 0

    max_det = 1000  # maximum detections per image
    conf_thres = 0.5  # confidence threshold
    iou_thres = 0.5  # NMS IOU threshold
    imgsz = (640, 640)
    agnostic_nms = False
    stride = 32

    meta = session.get_modelmeta().custom_metadata_map
    if "stride" in meta:
        stride, names = int(meta["stride"]), eval(meta["names"])
    fp16 = False

    imgsz = list(imgsz)
    imgsz = [max(math.ceil(x / int(stride)) * int(stride), 0) for x in imgsz]
    output_names = [x.name for x in session.get_outputs()]

    img = cv2.imread(input_file)  # BGR
    im = letterbox(img, [640, 640], stride)[0]  # padded resize
    im = im.transpose((2, 0, 1))[::-1]  # HWC to CHW, BGR to RGB
    im = np.ascontiguousarray(im)  # contiguous
    im = torch.from_numpy(im)
    im = im.half() if fp16 else im.float()  # uint8 to fp16/32
    im /= 255  # 0 - 255 to 0.0 - 1.0
    im = im[None]  # [3, 640, 480] => [1, 3, 640, 480]

    # inference
    im = im.cpu().numpy()  # torch to numpy
    y = session.run(output_names, {session.get_inputs()[0].name: im})
    pred = torch.from_numpy(y[0])
    pred = non_max_suppression(
        pred, conf_thres, iou_thres, agnostic_nms, max_det=max_det
    )

    for det in pred:
        for *_xyxy, _conf, cls in det:
            if cls == 0:  # cls:0  person
                counter += 1
    return counter


session = onnxruntime.InferenceSession(modelPath, providers=["CPUExecutionProvider"])

while True:
    for img in os.listdir(imagesPath):
        request = PublishToIoTCoreRequest()
        request.topic_name = topic

        start = time.time()
        counter = inference(session, "{}/{}".format(imagesPath, img))
        end = time.time()
        inference_time = np.round((end - start) * 1000, 2)

        payload = {
            "image_file": img,
            "person": counter,
            "inference_time": inference_time,
        }

        request.payload = json.dumps(payload).encode()
        request.qos = qos
        print(payload)
        operation = ipc_client.new_publish_to_iot_core()
        operation.activate(request)
        future_response = operation.get_response().result(timeout=5)
        print("successfully published message: ", future_response)
        time.sleep(5)

10 最後に

今回は、AWS IoT Greenglassで、物体検出モデルが動作するカスタムコンポーネントを作ってみました。機械学習モデルを使用する場合、必要なライブラリのセットアップに、けっこう手間がかかりますが、その辺をクリアできれば、特に難しいところは無いと思います。また、モデルは、S3に配置されてものがデプロイされますので、チューニング等によるモデル更新のライフサイクルの、簡単に構築できるでしょう。

なお、今回使用したのは、80クラス(人、バス、車など)について事前に学習されたモデル「 YOLOv5s 」でしたが、ファインチューニングで、特定の物体を検出したりすることで、更なる応用も可能でしょう。

カメラを模擬するために使用した写真は、Pexels様のものを利用させて頂きました。

11 参考リンク


ONNX Runtime を使った AWS IoT Greengrass 上での画像分類の最適化
Inference PyTorch models on different hardware targets with ONNX Runtime
https://github.com/ultralytics/yolov5

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.